GenHack 4 - ChefAI - Urban Heat Island Effect Visualization¶
- Group: ChefAI
- Period: 2 - Visualize and Explain the Urban Heat Island Effect
Introduction¶
In this notebook we will explore the Urban Heat Island (UHI) effect, which refers to the phenomenon where urban areas experience higher temperatures than their rural surroundings due to human activities and infrastructure.
We will visualize the UHI effect by analyzing temperature patterns in Emilia-Romagna, with a focus on the city of Bologna, all while considering other key factors like the NDVI index and the DEGURBA index.
import glob
import warnings
import math
import typing
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import geopandas as gpd
import xarray as xr
import rioxarray as rioxr
import seaborn as sns
from scipy import stats
from tqdm import tqdm
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.colors import Normalize
# ECA&D station functions
def dms_to_decimal(dms_series) -> pd.Series:
"""DMS (degrees, minutes, seconds) to decimal degrees conversion."""
# Extract sign
signs = np.where(dms_series.str[0] == "+", 1, -1)
# Remove sign and split by ':'
parts = dms_series.str[1:].str.split(":", expand=True).astype(float)
# Calculate decimal degrees
decimal = signs * (parts[0] + parts[1] / 60 + parts[2] / 3600)
return decimal
def eca_read_stations(path: str) -> gpd.GeoDataFrame:
"""Load ECA&D stations from a CSV file and convert to GeoDataFrame."""
stations_df = pd.read_csv(path, skiprows=17, skipinitialspace=True)
# Strip whitespace from column names
stations_df.columns = stations_df.columns.str.strip()
# Strip whitespace from CN and STANAME columns
stations_df["CN"] = stations_df["CN"].str.strip()
stations_df["STANAME"] = stations_df["STANAME"].str.strip()
# Convert degrees-minutes-seconds to decimal degrees
lat_decimal = dms_to_decimal(stations_df["LAT"])
lon_decimal = dms_to_decimal(stations_df["LON"])
# Create GeoDataFrame
geometries = gpd.points_from_xy(lon_decimal, lat_decimal)
stations_gdf = gpd.GeoDataFrame(stations_df, geometry=geometries, crs="EPSG:4326")
# Drop original LAT and LON columns
stations_gdf.drop(columns=["LAT", "LON"], inplace=True)
return stations_gdf
def eca_read_station_tx(path: str) -> pd.DataFrame:
"""Read TX data for a single ECA&D station."""
station_df = pd.read_csv(path, skiprows=20, skipinitialspace=True, engine="c")
# Convert DATE column to datetime
station_df["DATE"] = pd.to_datetime(station_df["DATE"], format="%Y%m%d")
# Convert from tenths of degree C to degree C
station_df["TX"] = station_df["TX"] / 10.0
# Where Q_TX is 9, set TX to NaN
station_df = station_df[station_df["Q_TX"] == 0]
station_df.drop(columns=["Q_TX"], inplace=True)
return station_df
def eca_read_all_stations_tx(paths: list[str]) -> pd.DataFrame:
"""Read TX data for multiple ECA&D stations and concatenate into a single DataFrame."""
all_stations_df = pd.DataFrame()
for path in tqdm(paths, desc="Reading station TX data"):
station_df = eca_read_station_tx(path)
all_stations_df = pd.concat([all_stations_df, station_df], ignore_index=True)
return all_stations_df
# ERA5 loader
def era5_read_xr(path_glob: str) -> xr.Dataset:
"""Load ERA5 dataset from one or more NetCDF files."""
paths = list(glob.glob(path_glob))
dataset = xr.open_mfdataset(paths, combine="by_coords")
dataset.rio.write_crs("EPSG:4326", inplace=True)
return dataset
# NDVI functions
def ndvi_read_xr(path: str) -> xr.DataArray:
"""Load NDVI raster lazily and rescale from 0–255 to -1..1."""
# lazy read
ndvi = rioxr.open_rasterio(path, masked=True, chunks=True, lock=False)
ndvi = typing.cast(xr.DataArray, ndvi)
ndvi = ndvi.squeeze()
# mask nodata (255)
ndvi = ndvi.where(ndvi != 255)
# lazy scaling (dask)
ndvi = ndvi * (2 / 255) - 1
return ndvi
def ndvi_read_xr_bbox_clip(path: str, gdf: gpd.GeoDataFrame) -> xr.DataArray:
"""
Load NDVI lazily and clip to the bbox of the GeoDataFrame.
CRS is handled correctly by reprojecting the region to raster CRS.
"""
# Load NDVI
ndvi = ndvi_read_xr(path)
# Convert gdf to raster CRS
gdf_r = gdf.to_crs(ndvi.rio.crs)
minx, miny, maxx, maxy = gdf_r.total_bounds
# Clip to bbox
ndvi_filtered = ndvi.rio.clip_box(minx=minx, miny=miny, maxx=maxx, maxy=maxy)
return ndvi_filtered
# Custom colormap for NDVI visualization
ndvi_cmap = LinearSegmentedColormap.from_list("ndvi_cmap", ["blue", "white", "green"])
Dataset Loading¶
# GADM GeoDataFrame
gadm_gdf = gpd.read_file("../data/gadm_410_europe.gpkg")
# ERA 5 daily max temperature at 2m
era5_variable_of_interest = "temperature"
era5_xr = era5_read_xr(f"../data/derived-era5-land-daily-statistics/*_{era5_variable_of_interest}*.nc")
# ECA&D stations
stations_path = "../processed/ECA_blend_tx/stations.txt"
stations_gdf = eca_read_stations(stations_path)
1. Urban Heat Island Analysis - Bologna¶
We aim to investigate how the Urban Heat Island (UHI) effect manifests within our selected region of interest.
To begin, we will define the study area and load the corresponding datasets:
import rasterio.enums
# GADM
region_str = "Bologna"
gadm_region = gadm_gdf[gadm_gdf["NAME_1"] == "Emilia-Romagna"].dissolve()
gadm_bigger_region = gadm_gdf[gadm_gdf["NAME_0"] == "Italy"].dissolve()
gadm_region_level1 = gadm_region.dissolve("GID_1")
gadm_region_flat = gadm_region_level1.dissolve()
print(f"GADM region shape: {gadm_region_flat.shape}, CRS: {gadm_region_flat.crs}")
# NDVI
ndvi_region_str = "2023-06-01_2023-09-01"
ndvi_region = (
ndvi_read_xr(f"../data/sentinel2_ndvi/ndvi_{ndvi_region_str}.tif")
.rio.clip_box(*gadm_region_flat.total_bounds, crs=gadm_region_flat.crs)
.rio.clip(gadm_region_flat.geometry.values, crs=gadm_region_flat.crs, all_touched=True)
)
print(f"NDVI region shape: {ndvi_region.shape}, CRS: {ndvi_region.rio.crs}")
# ERA5
era5_day = "2023-08-10"
era5_region = era5_xr.rio.clip(gadm_bigger_region.geometry, gadm_bigger_region.crs, drop=True, all_touched=True)
era5_region = era5_region.sel(valid_time=era5_day)
era5_region_t2m = era5_region["t2m"]
era5_region_t2m = era5_region_t2m - 273.15 # Kelvin to Celsius
era5_region_t2m = era5_region_t2m.rio.reproject_match(ndvi_region, resampling=rasterio.enums.Resampling.bilinear)
print("ERA5 region shape:", era5_region_t2m.shape, "CRS:", era5_region_t2m.rio.crs)
GADM region shape: (1, 20), CRS: EPSG:4326 NDVI region shape: (1910, 3568), CRS: EPSG:3035 ERA5 region shape: (1910, 3568) CRS: EPSG:3035
DEGURBA Classification¶
To help us build a metric that captures the UHI effect, we’ll bring in another dataset that describes the degree of urbanization across our region.
The DEGURBA classification groups areas into three categories:
- Cities – densely populated
- Towns and suburbs – medium-density areas
- Rural areas – sparsely populated
Now let’s load the DEGURBA dataset and merge it with the GADM regions:
# Load DEGURBA dataset for Emilia-Romagna
def read_degurba(path: str) -> pd.DataFrame:
"""Load DEGURBA data from a JSON file into a DataFrame."""
with open(path, "r") as f:
degurba_data = json.load(f)
degurba_df = pd.DataFrame(degurba_data["resultset"])
return degurba_df
degurba_df = read_degurba("../data/degurba-emilia.json")
degurba_df.head()
| COD_RIP | COD_REG | COD_UTS | COD_PROV_STORICO | PRO_COM_T | PRO_COM | COMUNE | COMUNE_A | SIGLA_AUTOMOBILISTICA | COM_ISO | ... | DEGURBA_2011 | DEGURBA_2021 | COD_ECO_DIV_2018 | DEN_ECO_DIV_2018 | COD_ECO_PRO_2018 | DEN_ECO_PRO_2018 | COD_ECO_SEZ_2018 | DEN_ECO_SEZ_2018 | COD_ECO_SSEZ_2018 | DEN_ECO_SSEZ_2018 | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2 | 08 | 033 | 033 | 033001 | 33001 | Agazzano | None | PC | 0 | ... | 3 | 3 | 1 | Divisione Temperata | 1C | Provincia Appenninica | 1C1 | Sezione Appenninica Settentrionale e Nord-Occi... | 1C1a | Sottosezione Appennino Tosco-Emiliano |
| 1 | 2 | 08 | 033 | 033 | 033002 | 33002 | Alseno | None | PC | 0 | ... | 3 | 3 | 1 | Divisione Temperata | 1B | Provincia Padana | 1B1 | Sezione Padana | 1B1b | Sottosezione Pianura Centrale |
| 2 | 2 | 08 | 033 | 033 | 033003 | 33003 | Besenzone | None | PC | 0 | ... | 3 | 3 | 1 | Divisione Temperata | 1B | Provincia Padana | 1B1 | Sezione Padana | 1B1b | Sottosezione Pianura Centrale |
| 3 | 2 | 08 | 033 | 033 | 033004 | 33004 | Bettola | None | PC | 0 | ... | 3 | 3 | 1 | Divisione Temperata | 1C | Provincia Appenninica | 1C1 | Sezione Appenninica Settentrionale e Nord-Occi... | 1C1a | Sottosezione Appennino Tosco-Emiliano |
| 4 | 2 | 08 | 033 | 033 | 033005 | 33005 | Bobbio | None | PC | 0 | ... | 3 | 3 | 1 | Divisione Temperata | 1C | Provincia Appenninica | 1C1 | Sezione Appenninica Settentrionale e Nord-Occi... | 1C1a | Sottosezione Appennino Tosco-Emiliano |
5 rows × 24 columns
# Extract COMUNE names from urban areas (DEGURBA 1)
urban_areas = degurba_df[degurba_df["DEGURBA_2021"] == 1]["COMUNE"].str.lower().str.replace(" ", "")
print(f"{"Number of urban areas (DEGURBA 1):":<45} {len(urban_areas):3d}")
# Extract COMUNE names from rural areas (DEGURBA 2 and 3)
rural_areas = degurba_df[degurba_df["DEGURBA_2021"].isin([2, 3])]["COMUNE"].str.lower().str.replace(" ", "")
print(f"{"Number of rural areas (DEGURBA 2 and 3):":<45} {len(rural_areas):3d}")
Number of urban areas (DEGURBA 1): 11 Number of rural areas (DEGURBA 2 and 3): 319
# filter GADM municipalities for urban/rural
urban_gadm_gdf = gadm_gdf[gadm_gdf["NAME_3"].str.lower().str.replace(" ", "").isin(urban_areas)]
rural_gadm_gdf = gadm_gdf[gadm_gdf["NAME_3"].str.lower().str.replace(" ", "").isin(rural_areas)]
# Append all unclassified comuni to rural
all_emilia_comuni = gadm_gdf[gadm_gdf["NAME_1"] == "Emilia-Romagna"]
classified = set(urban_areas) | set(rural_areas)
# Find unclassified comuni
unclassified = all_emilia_comuni[~all_emilia_comuni["NAME_3"].str.lower().str.replace(" ", "").isin(classified)]
print(f"Number of unclassified entries: {len(unclassified)}")
print(f"Unclassified comuni: {unclassified['NAME_3'].tolist()}")
# Append to rural
rural_gadm_gdf = pd.concat([rural_gadm_gdf, unclassified], ignore_index=True)
print(f"Urban GADM shape: {urban_gadm_gdf.shape}")
print(f"Rural GADM shape: {rural_gadm_gdf.shape}")
Number of unclassified entries: 34 Unclassified comuni: ['Bazzano', 'Castello Di Serravalle', 'Crespellano', 'Granaglione', 'Monteveglio', 'Porretta Terme', 'Savigno', 'Berra', 'Formignana', 'Massa Fiscaglia', 'Migliarino', 'Migliaro', 'Mirabello', 'Ro', "Sant' Agostino", 'Tresigallo', 'Castrocaro Terme E Terra Del Sol', 'Mezzani', 'Polesine Parmense', 'Sissa', 'Sorbolo', 'Trecasali', 'Zibello', 'Caminata', 'Nibbiano', 'Pecorara', 'Busana', 'Collagna', 'Ligonchio', 'Ramiseto', 'Monte Colombo', 'Montescudo', 'Poggio Berni', 'Torriana'] Urban GADM shape: (11, 21) Rural GADM shape: (339, 21)
DEGURBA Visualization over GADM, ERA5-Land, and NDVI¶
We can now visualize the spatial distribution of GADM administrative divisions, ERA5-Land 2-meter temperatures, and NDVI values for our region of interest, overlaid with their respective urbanization classifications:
import matplotlib.patches as mpatches
from matplotlib.lines import Line2D
# Create figure with optimized layout
fig, axes = plt.subplots(1, 3, figsize=(20, 6.5))
# ============================================================================
# DATA PREPARATION
# ============================================================================
# Clip temperature to region boundaries
era5_temp_clipped = era5_region_t2m.rio.clip(
gadm_region_flat.geometry.values, gadm_region_flat.crs, drop=True, all_touched=True
)
# Precompute CRS transformations
temp_crs = era5_temp_clipped.rio.crs
ndvi_crs = ndvi_region.rio.crs
urban_temp_crs = urban_gadm_gdf.to_crs(temp_crs)
rural_temp_crs = rural_gadm_gdf.to_crs(temp_crs)
urban_ndvi_crs = urban_gadm_gdf.to_crs(ndvi_crs)
rural_ndvi_crs = rural_gadm_gdf.to_crs(ndvi_crs)
# ============================================================================
# PANEL 1: DEGURBA CLASSIFICATION
# ============================================================================
urban_gadm_gdf.dissolve(by="ENGTYPE_1", method="coverage").plot(
ax=axes[0], color="#D32F2F", alpha=0.75, edgecolor="#B71C1C", linewidth=0.8
)
rural_gadm_gdf.dissolve(by="ENGTYPE_1", method="coverage").plot(
ax=axes[0], color="#388E3C", alpha=0.75, edgecolor="#1B5E20", linewidth=0.8
)
# Legend
urban_patch = mpatches.Patch(label="Urban Areas (Type 1)", facecolor="#D32F2F", edgecolor="#B71C1C", alpha=0.75)
rural_patch = mpatches.Patch(label="Rural Areas (Type 2 & 3)", facecolor="#388E3C", edgecolor="#1B5E20", alpha=0.75)
axes[0].legend(handles=[urban_patch, rural_patch], loc="upper right", framealpha=0.95, fontsize=10)
axes[0].set_title("DEGURBA Classification", fontsize=14, fontweight="bold", pad=12)
axes[0].set_xlabel("Longitude", fontsize=11)
axes[0].set_ylabel("Latitude", fontsize=11)
axes[0].grid(True, alpha=0.25, linestyle="--", linewidth=0.5)
axes[0].tick_params(labelsize=9)
# ============================================================================
# PANEL 2: TEMPERATURE WITH DEGURBA BOUNDARIES
# ============================================================================
era5_temp_clipped.plot(
ax=axes[1],
cmap="RdYlGn_r",
vmin=15,
vmax=35,
alpha=0.85,
add_colorbar=True,
cbar_kwargs={"label": "Temperature (°C)", "shrink": 0.85, "pad": 0.02},
)
# Overlay boundaries
rural_temp_crs.boundary.plot(ax=axes[1], edgecolor="#2E7D32", linewidth=1.8, alpha=0.9)
urban_temp_crs.boundary.plot(ax=axes[1], edgecolor="#C62828", linewidth=2.2, alpha=0.95)
legend_elements = [
Line2D([0], [0], color="#C62828", linewidth=2.2, label="Urban (Type 1)"),
Line2D([0], [0], color="#2E7D32", linewidth=1.8, label="Rural (Type 2 & 3)"),
]
axes[1].set_title(
f"ERA5-Land Temperature ({era5_day})\nwith DEGURBA Boundaries", fontsize=14, fontweight="bold", pad=12
)
axes[1].set_xlabel("Longitude", fontsize=11)
axes[1].set_ylabel("Latitude", fontsize=11)
axes[1].legend(handles=legend_elements, loc="upper right", framealpha=0.95, fontsize=10)
axes[1].grid(True, alpha=0.25, linestyle="--", linewidth=0.5)
axes[1].tick_params(labelsize=9)
# ============================================================================
# PANEL 3: NDVI WITH DEGURBA BOUNDARIES
# ============================================================================
ndvi_region.plot(
ax=axes[2],
cmap=ndvi_cmap,
vmin=-1,
vmax=1,
alpha=0.9,
add_colorbar=True,
cbar_kwargs={"label": "NDVI", "shrink": 0.85, "pad": 0.02},
)
# Overlay boundaries
rural_ndvi_crs.boundary.plot(
ax=axes[2], edgecolor="#2E7D32", linewidth=1.8, alpha=0.85, linestyle="--", label="Rural (Type 2 & 3)"
)
urban_ndvi_crs.boundary.plot(ax=axes[2], edgecolor="#C62828", linewidth=2.2, alpha=0.9, label="Urban (Type 1)")
axes[2].set_title(f"NDVI ({ndvi_region_str})\nwith DEGURBA Boundaries", fontsize=14, fontweight="bold", pad=12)
axes[2].set_xlabel("Longitude", fontsize=11)
axes[2].set_ylabel("Latitude", fontsize=11)
axes[2].legend(loc="upper right", framealpha=0.95, fontsize=10)
axes[2].grid(True, alpha=0.25, linestyle="--", linewidth=0.5)
axes[2].tick_params(labelsize=9)
# ============================================================================
# FINAL LAYOUT
# ============================================================================
fig.suptitle(
"Bologna Province: DEGURBA Classification, Temperature, and NDVI Analysis", fontsize=16, fontweight="bold", y=0.99
)
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.show()
UHI Metric Calculation¶
The standard way to measure the UHI effect is to compare the average temperature in urban areas with the average temperature in rural areas. The difference between these two values gives us the UHI metric.
To calculate this metric at the pixel level, we first need the average temperature of rural areas. We will create a mask for the ERA5-Land temperature data based on the rural zones identified in the DEGURBA classification, and then compute the mean temperature for those rural pixels.
In this section we will work only with ERA5-Land data to keep things straightforward. The same procedure can also be applied to the interpolated ECA data, which we will use in the next part of the analysis.
# Clip ERA5 temperature to urban areas only
region_era5_urban = era5_region_t2m.rio.clip(urban_gadm_gdf.to_crs(era5_region_t2m.rio.crs).geometry, drop=False)
def urban_mask(ndvi, gadm, threshold: float, urban=True):
ndvi_clipped = ndvi.rio.clip(gadm.geometry.values, gadm.crs, drop=True)
if urban:
mask = ndvi_clipped < threshold
else:
mask = ndvi_clipped >= threshold
return ndvi_clipped.where(mask)
# Clip ERA5 to rural areas using NDVI thresholding
ndvi_threshold = 0.5
ndvi_rural_mask = urban_mask(ndvi_region, rural_gadm_gdf, threshold=ndvi_threshold, urban=False)
# Reproject NDVI mask to match ERA5 grid
ndvi_rural_mask_reprojected = ndvi_rural_mask.rio.reproject_match(era5_region_t2m)
# Clip ERA5 to rural areas
region_era5_rural = era5_region_t2m.rio.clip(rural_gadm_gdf.geometry.values, rural_gadm_gdf.crs, drop=False)
# Further filter by NDVI threshold: keep only where NDVI >= threshold
# Use .values to avoid coordinate broadcasting issues
region_era5_rural = region_era5_rural.where(~np.isnan(ndvi_rural_mask_reprojected.values))
# =============================================================================
# Visualize Urban and Rural Temperature Masks
# =============================================================================
fig, axes = plt.subplots(1, 2, figsize=(18, 6))
# Plot 1: Urban areas only
region_era5_urban.plot(ax=axes[0], cmap="RdYlGn_r", vmin=15, vmax=35, cbar_kwargs={"label": "Temperature (°C)"})
urban_gadm_gdf.to_crs(era5_region_t2m.rio.crs).boundary.plot(
ax=axes[0], edgecolor="red", linewidth=1.5, label="Urban boundaries"
)
axes[0].set_title("Urban Areas Only", fontsize=14, fontweight="bold")
axes[0].set_aspect("equal")
axes[0].legend(loc="upper right")
# Plot 2: Rural areas only
region_era5_rural.plot(ax=axes[1], cmap="RdYlGn_r", vmin=15, vmax=35, cbar_kwargs={"label": "Temperature (°C)"})
rural_gadm_gdf.to_crs(era5_region_t2m.rio.crs).boundary.plot(
ax=axes[1], edgecolor="green", linewidth=0.5, label="Rural boundaries"
)
axes[1].set_title("Rural Areas Only", fontsize=14, fontweight="bold")
axes[1].set_aspect("equal")
axes[1].legend(loc="upper right")
plt.suptitle(f"Urban and Rural Temperature Masks - {era5_day}", fontsize=16, fontweight="bold", y=1.02)
plt.tight_layout()
plt.show()
# Calculate rural mean temperatures for UHI index
rural_mean_temp = float(region_era5_rural.mean(skipna=True).values)
print(f"Rural mean temperature (NDVI >= {ndvi_threshold}): {rural_mean_temp:.2f}°C")
Rural mean temperature (NDVI >= 0.5): 26.92°C
Pixel-by-Pixel UHI Differential Index¶
Now that we have the average rural temperature, we can compute the UHI index for each pixel in the urban areas by subtracting the rural average temperature from the urban pixel temperatures.
import rasterio.enums
# Reproject ERA5 temperature to match NDVI grid using bilinear interpolation
era5_temp_ndvi_res = era5_region_t2m.rio.reproject_match(ndvi_region, resampling=rasterio.enums.Resampling.bilinear)
# Compute UHI at NDVI resolution
UHI_ndvi_res_raw = era5_temp_ndvi_res - rural_mean_temp
# Set all negative values to 0 (only keep heat islands, not cool zones)
UHI_ndvi_res = UHI_ndvi_res_raw.where(UHI_ndvi_res_raw >= 0, 0)
print("\nUHI statistics at NDVI resolution (negative values set to 0):")
print(f" Rural mean temperature: {rural_mean_temp:.2f}°C")
print(f" Min UHI: {float(UHI_ndvi_res.min()):.2f}°C (always 0 or higher)")
print(f" Max UHI: {float(UHI_ndvi_res.max()):.2f}°C")
print(f" Mean UHI: {float(UHI_ndvi_res.mean()):.2f}°C")
print(f" Std UHI: {float(UHI_ndvi_res.std()):.2f}°C")
UHI statistics at NDVI resolution (negative values set to 0): Rural mean temperature: 26.92°C Min UHI: 0.00°C (always 0 or higher) Max UHI: 4.72°C Mean UHI: 0.74°C Std UHI: 0.98°C
# =============================================================================
# Visualize UHI at NDVI Resolution (positive values only)
# =============================================================================
# Clip UHI data to region boundaries only
UHI_ndvi_res_clipped = UHI_ndvi_res.rio.clip(gadm_region_flat.to_crs(UHI_ndvi_res.rio.crs).geometry, drop=True)
fig, axes = plt.subplots(figsize=(14, 10))
# Plot UHI at NDVI resolution (only positive values, 0 to max)
UHI_ndvi_res_clipped.plot(
ax=axes,
cmap="YlOrRd", # Yellow-Orange-Red for heat intensity
vmin=0,
vmax=3,
cbar_kwargs={"label": "UHI Index (°C)\nHeat Island Intensity", "shrink": 0.8},
)
# Overlay boundaries
urban_gadm_gdf.to_crs(UHI_ndvi_res_clipped.rio.crs).boundary.plot(
ax=axes, edgecolor="darkred", linewidth=1.5, alpha=0.7, label="Urban areas"
)
rural_gadm_gdf.to_crs(UHI_ndvi_res_clipped.rio.crs).boundary.plot(
ax=axes, edgecolor="darkgreen", linewidth=0.5, alpha=0.5, label="Rural areas"
)
axes.set_title(f"UHI Differential Index at NDVI Resolution\n{era5_day}", fontsize=16, fontweight="bold")
axes.set_xlabel("Longitude", fontsize=12)
axes.set_ylabel("Latitude", fontsize=12)
axes.set_aspect("equal")
axes.legend(loc="upper right", fontsize=10)
plt.tight_layout()
plt.show()
Urban Heat Island Composite Index (UHICI)¶
To better quantify the Urban Heat Island effect, we can create a composite index called the Urban Heat Island Composite Index (UHICI). This index combines temperature, vegetation cover (NDVI), and urbanization level (DEGURBA) to provide a more comprehensive measure of heat island intensity.
We define the UHICI as follows:
Urban Heat Island Composite Index (UHICI)
The UHICI is the weighted sum of three normalized components:
- Maximum daily temperature ($T_{\text{max}}$)
- Normalized Difference Vegetation Index (NDVI)
- DEGURBA classification (urbanization level)
Formula:
$$\text{UHICI} = N\Big(w_1 \cdot N(T_{\text{max}}) + w_2 \cdot (1 - N(\text{NDVI})) + w_3 \cdot (1 - N(\text{DEGURBA}))\Big)$$
Where:
- $N(x) = \frac{x - \min}{\max - \min}$ normalizes values to [0, 1]
- Higher temperature → increases index (heat stress)
- Higher NDVI → decreases index (vegetation cools)
- Urban areas → increases index (heat vulnerability)
- $w_1, w_2, w_3$ = weights for each component (don't need to sum to 1 due to final normalization)
Output: 0 (coolest) to 1 (hottest heat island effect)
UHICI Analysis¶
In this section we will use the just defined UHICI index with two different datasets.
- ERA5-Land temperature data, interpolated to match the NDVI resolution
- ECA temperature data, interpolated with a nearest-neighbor approach to match the NDVI resolution
def normalize(data):
"""Min-Max normalize xarray DataArray."""
data_min = data.min()
data_max = data.max()
normalized = (data - data_min) / (data_max - data_min)
return normalized
def UHICI(temperature, ndvi, degurba, weights: tuple[float, float, float]):
"""Compute Urban Heat Island Composite Index (UHICI)."""
uhici = (
weights[0] * (normalize(temperature))
+ weights[1] * (1 - normalize(ndvi))
+ weights[2] * (1 - normalize(degurba))
)
# Normalize the final index to [0, 1] range
uhici = normalize(uhici)
return uhici
To compute the UHICI, we need to prepare the three components: temperature, NDVI, and DEGURBA classification. We will do that using xarray DataArrays.
def create_degurba_raster(gdf_urban, gdf_rural, reference_raster):
"""
Create a DEGURBA raster matching the reference raster's grid.
Parameters:
gdf_urban: GeoDataFrame of urban areas (DEGURBA = 1)
gdf_rural: GeoDataFrame of rural areas (DEGURBA = 2 or 3)
reference_raster: xarray DataArray to match (e.g., NDVI or temperature)
Returns:
xarray DataArray with DEGURBA values (1 for urban, 2/3 for rural)
"""
from rasterio import features
# Get the reference raster properties
height, width = reference_raster.shape
transform = reference_raster.rio.transform()
crs = reference_raster.rio.crs
# Reproject GeoDataFrames to match reference CRS
gdf_urban_reproj = gdf_urban.to_crs(crs)
gdf_rural_reproj = gdf_rural.to_crs(crs)
# Create empty raster
degurba_array = np.zeros((height, width), dtype=np.float32)
# Burn rural areas (value = 2.5 as average of 2 and 3)
if not gdf_rural_reproj.empty:
rural_shapes = ((geom, 2.5) for geom in gdf_rural_reproj.geometry)
features.rasterize(rural_shapes, out=degurba_array, transform=transform, fill=0, dtype=np.float32)
else:
warnings.warn("Rural GeoDataFrame is empty; no rural areas will be rasterized.")
# Burn urban areas (value = 1, will overwrite rural where they overlap)
if not gdf_urban_reproj.empty:
urban_shapes = ((geom, 1.0) for geom in gdf_urban_reproj.geometry)
features.rasterize(urban_shapes, out=degurba_array, transform=transform, all_touched=False, dtype=np.float32)
else:
warnings.warn("Urban GeoDataFrame is empty; no urban areas will be rasterized.")
# Create xarray DataArray from numpy array
degurba_xr = xr.DataArray(degurba_array, coords={"y": reference_raster.y, "x": reference_raster.x}, dims=["y", "x"])
# Set spatial metadata
degurba_xr.rio.write_crs(crs, inplace=True)
degurba_xr.rio.write_transform(transform, inplace=True)
# Mask areas outside municipalities (set to NaN)
degurba_xr = degurba_xr.where(degurba_xr.values > 0)
return degurba_xr
def create_eca_raster(
stations_gdf: gpd.GeoDataFrame,
reference_raster: xr.DataArray,
value_column: str,
):
"""
Create a raster from ECA&D station data matching the reference raster's grid.
Parameters:
stations_gdf: GeoDataFrame with station locations and values
reference_raster: xarray DataArray to match (e.g., NDVI or temperature)
value_column: Column name in stations_gdf containing the values to rasterize
Returns:
xarray DataArray with rasterized station values
"""
from scipy.spatial import cKDTree
# Get the reference raster properties
height, width = reference_raster.shape
transform = reference_raster.rio.transform()
crs = reference_raster.rio.crs
# Reproject stations to match reference CRS
stations_reproj = stations_gdf.to_crs(crs)
# Create empty raster
station_array = np.full((height, width), np.nan, dtype=np.float32)
station_coords = []
station_values = []
# Populate with station data
for idx, row in stations_reproj.iterrows():
station_lon = row.geometry.x
station_lat = row.geometry.y
station_value = row[value_column]
# Compute pixel indices
x_idx = int((station_lon - transform[2]) / transform[0])
y_idx = int((station_lat - transform[5]) / transform[4])
if 0 <= x_idx < width and 0 <= y_idx < height:
station_array[y_idx, x_idx] = station_value
station_coords.append([y_idx, x_idx])
station_values.append(station_value)
station_coords = np.array(station_coords)
station_values = np.array(station_values)
# Build KDTree for fast nearest neighbor lookup
kdtree = cKDTree(station_coords)
# Vectorized approach: find all NaN pixels at once
nan_mask = np.isnan(station_array)
nan_indices = np.argwhere(nan_mask)
if len(nan_indices) > 0:
# Query the KDTree for nearest neighbors
distances, indices = kdtree.query(nan_indices)
# Fill in the NaN values
for (i, j), station_idx in zip(nan_indices, indices):
station_array[i, j] = station_values[station_idx]
station_da = xr.DataArray(
station_array,
dims=["y", "x"],
coords={"y": reference_raster.y, "x": reference_raster.x},
)
return station_da
degurba_raster = create_degurba_raster(urban_gadm_gdf, rural_gadm_gdf, ndvi_region)
print(
f"DEGURBA raster shape: {degurba_raster.shape}"
f"\n\tCRS: {degurba_raster.rio.crs}"
f"\n\tNaN count: {np.isnan(degurba_raster).sum().item()} ({100 * np.isnan(degurba_raster).sum().item() / degurba_raster.size:.2f}%)"
)
DEGURBA raster shape: (1910, 3568) CRS: EPSG:3035 NaN count: 3302096 (48.45%)
region_stations = stations_gdf[stations_gdf.within(gadm_bigger_region.union_all(method="coverage"))]
region_stations_tx = eca_read_all_stations_tx(
[f"../processed/ECA_blend_tx/TX_STAID{id:06d}.txt" for id in region_stations["STAID"]]
)
region_stations_tx = region_stations_tx.merge(
region_stations[["STAID", "geometry"]], left_on="STAID", right_on="STAID", how="left"
)
region_stations_tx_gdf = gpd.GeoDataFrame(region_stations_tx, geometry="geometry", crs=stations_gdf.crs)
print(f"ECA&D TX stations in region: {region_stations_tx_gdf.shape}\n\tCRS: {region_stations_tx_gdf.crs}")
Reading station TX data: 100%|███████████████████████████████████| 1111/1111 [00:53<00:00, 20.83it/s]
ECA&D TX stations in region: (5931397, 5) CRS: EPSG:4326
eca_day = era5_day
eca_raster = create_eca_raster(region_stations_tx_gdf[region_stations_tx_gdf["DATE"] == eca_day], ndvi_region, "TX")
print(
f"ECA&D raster shape: {eca_raster.shape}"
f"\n\tCRS: {eca_raster.rio.crs}"
f"\n\tNaN count: {np.isnan(eca_raster).sum().item()} ({100 * np.isnan(eca_raster).sum().item() / eca_raster.size:.2f}%)"
)
ECA&D raster shape: (1910, 3568) CRS: None NaN count: 0 (0.00%)
gadm_bigger_region = gadm_gdf[gadm_gdf["GID_0"] == "ITA"].dissolve()
# Clip ERA5 to a larger reagion of interest to avoid missing data at the edges
era5_region = era5_xr.rio.clip(gadm_bigger_region.geometry, gadm_bigger_region.crs, drop=True, invert=True)
era5_region = era5_region.sel(valid_time=era5_day)
uhici_era = UHICI(era5_region_t2m, ndvi_region, degurba_raster, weights=(0.3, 0.6, 0.1))
uhici_eca = UHICI(eca_raster, ndvi_region, degurba_raster, weights=(0.3, 0.6, 0.1))
# plot uhici dataarray over Emilia-Romagna map
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
uhici_era.plot(
ax=axes[0],
cmap="YlOrRd",
vmin=uhici_era.min().values,
vmax=uhici_era.max().values,
cbar_kwargs={"label": "UHICI Index", "shrink": 0.8},
)
gadm_region.to_crs(uhici_era.rio.crs).dissolve("GID_2").plot(
ax=axes[0], facecolor="none", edgecolor="black", linewidth=0.5
)
axes[0].set_xlabel("Longitude")
axes[0].set_ylabel("Latitude")
axes[0].set_title(f"UHICI \ndate: {era5_day}\nEmilia-Romagna", fontsize=12, pad=12)
uhici_eca.plot(
ax=axes[1],
cmap="YlOrRd",
vmin=uhici_eca.min().values,
vmax=uhici_eca.max().values,
cbar_kwargs={"label": "UHICI Index", "shrink": 0.8},
)
gadm_region.to_crs(uhici_eca.rio.crs).dissolve("GID_2").plot(
ax=axes[1], facecolor="none", edgecolor="black", linewidth=0.5
)
axes[1].set_xlabel("Longitude")
axes[1].set_ylabel("Latitude")
axes[1].set_title(f"UHICI from ECA&D \ndate: {eca_day}\nEmilia-Romagna", fontsize=12, pad=12)
fig.suptitle("Urban Heat Island Composite Index (UHICI) Comparison", fontsize=16, fontweight="bold", y=1.02)
plt.tight_layout()
plt.show()
We can zoom on the city of Bologna to visualize the UHICI index more clearly.
# plot uhici dataarray over Emilia-Romagna map
bologna_region = gadm_gdf[gadm_gdf["NAME_2"] == "Bologna"].dissolve()
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
uhici_era.plot(
ax=axes[0],
cmap="YlOrRd",
vmin=uhici_era.min().values,
vmax=uhici_era.max().values,
cbar_kwargs={"label": "UHICI Index", "shrink": 0.8},
)
bologna_region.to_crs(uhici_era.rio.crs).plot(ax=axes[0], facecolor="none", edgecolor="black", linewidth=0.5)
axes[0].set_xlabel("Longitude")
axes[0].set_ylabel("Latitude")
axes[0].set_title(f"UHICI \ndate: {era5_day}\nBologna", fontsize=12, pad=12)
axes[0].set_xlim(bologna_region.to_crs(uhici_era.rio.crs).total_bounds[[0, 2]])
axes[0].set_ylim(bologna_region.to_crs(uhici_era.rio.crs).total_bounds[[1, 3]])
uhici_eca.plot(
ax=axes[1],
cmap="YlOrRd",
vmin=uhici_eca.min().values,
vmax=uhici_eca.max().values,
cbar_kwargs={"label": "UHICI Index", "shrink": 0.8},
)
bologna_region.to_crs(uhici_eca.rio.crs).plot(
ax=axes[1], facecolor="none", edgecolor="black", linewidth=0.5
)
axes[1].set_xlabel("Longitude")
axes[1].set_ylabel("Latitude")
axes[1].set_title(f"UHICI from ECA&D \ndate: {eca_day}\nBologna", fontsize=12, pad=12)
axes[1].set_xlim(bologna_region.to_crs(uhici_eca.rio.crs).total_bounds[[0, 2]])
axes[1].set_ylim(bologna_region.to_crs(uhici_eca.rio.crs).total_bounds[[1, 3]])
fig.suptitle("Urban Heat Island Composite Index (UHICI) Comparison", fontsize=16, fontweight="bold", y=1.02)
plt.tight_layout()
plt.show()
UHICI vs NDVI¶
We can also visualize the relation between UHICI and NDVI values to see how vegetation impacts urban heat intensity.
# Flatten arrays
ndvi_region_flat = ndvi_region.values.flatten()
uhici_era_flat = uhici_era.values.flatten()
uhici_eca_flat = uhici_eca.values.flatten()
# Remove NaNs (if any) and keep only valid pairs
er_valid_mask = ~np.isnan(ndvi_region_flat) & ~np.isnan(uhici_era_flat) & ~np.isnan(uhici_eca_flat)
er_ndvi_valid = ndvi_region_flat[er_valid_mask]
er_uhici_era_valid = uhici_era_flat[er_valid_mask] # ERA5
er_uhici_eca_valid = uhici_eca_flat[er_valid_mask] # ECA
# Randomly sample 1% of the data
def random_sample(data, fraction=0.01, seed=42):
np.random.seed(seed)
sample_size = max(1, int(fraction * len(data)))
sample_indices = np.random.choice(len(data), size=sample_size, replace=False)
return data[sample_indices]
er_ndvi_sample = random_sample(er_ndvi_valid, fraction=0.1)
er_uhici_era_sample = random_sample(er_uhici_era_valid, fraction=0.1) # ERA5
er_uhici_eca_sample = random_sample(er_uhici_eca_valid, fraction=0.1) # ECA
# Create a white->yellow->red linear colormap and plotting
cmap_wyr = LinearSegmentedColormap.from_list("wyr", ["yellow", "red"])
# Single scatter plot of sampled UHICI with color gradient vs sampled NDVI
fig, axes = plt.subplots(2, 1, figsize=(12, 10))
scatter = axes[0].scatter(
x=er_ndvi_sample,
y=er_uhici_era_sample,
c=er_uhici_era_sample,
cmap=cmap_wyr,
alpha=0.5,
)
cbar = plt.colorbar(scatter, ax=axes[0])
cbar.set_label("UHICI Value")
axes[0].set_xlabel("NDVI")
axes[0].set_ylabel("UHICI")
axes[0].set_title("Bologna: UHICI vs NDVI (ERA5) \nDate: " + era5_day)
axes[0].grid(True, linestyle="--", alpha=0.5)
scatter = axes[1].scatter(
x=er_ndvi_sample,
y=er_uhici_eca_sample,
c=er_uhici_eca_sample,
cmap="viridis",
alpha=0.5,
)
cbar = plt.colorbar(scatter, ax=axes[1])
cbar.set_label("UHICI Value")
axes[1].set_xlabel("NDVI")
axes[1].set_ylabel("UHICI")
axes[1].set_title("Bologna: UHICI vs NDVI (ECA) \nDate: " + era5_day)
axes[1].grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()
def plot_uhci_distribution(uhici_era_flat: np.ndarray, ax=None):
"""Plot the distribution of UHICI values with colored bars."""
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import LinearSegmentedColormap, Normalize
# Remove NaN values
uhici_era_flat = uhici_era_flat[~np.isnan(uhici_era_flat)]
# Binning
bins = 50
data_min = np.nanmin(uhici_era_flat) if len(uhici_era_flat) > 0 else 0
data_max = np.nanmax(uhici_era_flat) if len(uhici_era_flat) > 0 else 1
bin_edges = np.linspace(data_min, data_max, bins + 1)
# Create white->yellow->red colormap for bar coloring
cmap_wyr = LinearSegmentedColormap.from_list("wyr", ["white", "yellow", "red"])
norm = Normalize(vmin=data_min, vmax=data_max)
# Compute histogram
counts, _ = np.histogram(uhici_era_flat, bins=bin_edges)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
bin_width = bin_edges[1] - bin_edges[0]
# Map colors from bin centers
colors = cmap_wyr(norm(bin_centers))
# Plot histogram
ax = ax or plt.gca()
ax.bar(bin_centers, counts, width=bin_width, color=colors, edgecolor="black", alpha=0.9)
fig, axes = plt.subplots(2, 1, figsize=(12, 10))
plot_uhci_distribution(er_uhici_era_valid, ax=axes[0])
axes[0].set_title("UHICI Distribution (ERA5) - Bologna", fontsize=14, fontweight="bold")
axes[0].set_xlabel("UHICI Value")
axes[0].set_ylabel("Frequency")
plot_uhci_distribution(er_uhici_eca_valid, ax=axes[1])
axes[1].set_title("UHICI Distribution (ECA) - Bologna", fontsize=14, fontweight="bold")
axes[1].set_xlabel("UHICI Value")
axes[1].set_ylabel("Frequency")
plt.tight_layout()
plt.show()
# Calculate 3 equal partitions of NDVI (terciles) for Emilia-Romagna
# Emilia-Romagna arrays
def ndvi_terciles_boxplot(ndvi_region: xr.DataArray, uhici: xr.DataArray, ax=None):
"""Create a boxplot of UHICI values partitioned by NDVI terciles."""
# Flatten arrays
ndvi_region_flat = ndvi_region.values.flatten()
uhici_flat = uhici.values.flatten()
mask_e = ~np.isnan(ndvi_region_flat) & ~np.isnan(uhici_flat)
ndvi_e_valid = ndvi_region_flat[mask_e]
uhici_e_valid = uhici_flat[mask_e]
# Compute terciles
ndvi_e_partitions = np.percentile(ndvi_e_valid, [33.33, 66.66]) if len(ndvi_e_valid) > 0 else (0, 0)
# Prepare data for violin plot
partition_labels = ["Low NDVI", "Medium NDVI", "High NDVI"]
violin_data = []
violin_labels = []
for i in range(3):
if i == 0:
mask_i = ndvi_e_valid <= ndvi_e_partitions[0]
elif i == 1:
mask_i = (ndvi_e_valid > ndvi_e_partitions[0]) & (ndvi_e_valid <= ndvi_e_partitions[1])
else:
mask_i = ndvi_e_valid > ndvi_e_partitions[1]
uhici_partition = uhici_e_valid[mask_i]
if len(uhici_partition) > 0:
violin_data.extend(uhici_partition)
violin_labels.extend([partition_labels[i]] * len(uhici_partition))
# Create DataFrame for violin plot
boxplot_df = pd.DataFrame({"NDVI Class": violin_labels, "UHICI": violin_data})
ax = ax or plt.gca()
sns.boxplot(
data=boxplot_df,
x="NDVI Class",
y="UHICI",
order=partition_labels,
hue="NDVI Class",
legend=False,
palette=["red", "orange", "lightgreen"],
ax=ax,
)
# Plot violin plot
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
ndvi_terciles_boxplot(ndvi_region, uhici_era, ax=axes[0])
axes[0].set_title("UHICI Distribution by NDVI Partitions — Bologna\nDate: " + era5_day, fontsize=12, fontweight="bold")
axes[0].set_xlabel("NDVI Class", fontsize=11)
axes[0].set_ylabel("UHICI", fontsize=11)
axes[0].grid(True, linestyle="--", alpha=0.3, axis="y")
ndvi_terciles_boxplot(ndvi_region, uhici_eca, ax=axes[1])
axes[1].set_title("UHICI Distribution by NDVI Partitions — Bologna\nDate: " + era5_day, fontsize=12, fontweight="bold")
axes[1].set_xlabel("NDVI Class", fontsize=11)
axes[1].set_ylabel("UHICI", fontsize=11)
axes[1].grid(True, linestyle="--", alpha=0.3, axis="y")
plt.tight_layout()
plt.show()
2. Explaining the UHI Effect: TX - NDVI Correlation Analysis¶
To explain the UHI effect further, we can analyze the correlation between maximum daily temperature (TX) and NDVI values across our region of interest.
We will focus on the city of Bologna to limit the number of data points and make the analysis more manageable, but the same approach can be applied to other cities or the entire region.
To do this, we will:
- Load station TX data and NDVI data for the region.
- For each station, compute the mean NDVI in a 1km buffer around the station.
- For each period (e.g., monthly, seasonal), compute the mean TX for each station
Then, for each period we will:
- Plot the TX time series for stations colored by their NDVI values.
- Calculate and plot the correlation between TX and NDVI with a regression line.
bologna_region_gdf = gadm_gdf[gadm_gdf["NAME_2"] == "Bologna"]
bologna_region_union = bologna_region_gdf.dissolve()
bologna_stations_gdf = stations_gdf[stations_gdf.within(bologna_region_union.union_all())]
print(f"Region has {len(bologna_stations_gdf)} stations.")
display(bologna_stations_gdf.sample(5))
Region has 60 stations.
| STAID | STANAME | CN | HGHT | geometry | |
|---|---|---|---|---|---|
| 4454 | 17546 | BUDRIO_DONDINA | IT | 30 | POINT (11.54944 44.52167) |
| 4478 | 17580 | CASTEL_SAN_PIETRO | IT | 58 | POINT (11.59667 44.41083) |
| 4464 | 17563 | CASALECCHIO_CANALE | IT | 63 | POINT (11.28056 44.47556) |
| 4524 | 17643 | INVASO | IT | 460 | POINT (11.22167 44.22694) |
| 1170 | 2807 | DIGA DI PAVANA | IT | 480 | POINT (11.00556 44.11806) |
Let's visualize the stations in the selected region first:
fig, axes = plt.subplots()
bologna_region_gdf.plot(facecolor="none", edgecolor="black", linewidth=1, ax=axes)
bologna_stations_gdf.plot(ax=axes, color="red", markersize=20)
plt.title("ECA&D Stations in Bologna Region")
plt.axis("off")
plt.show()
Before continuing, we need to define some utility functions:
def _load_station_tx_data(stations_in_region: gpd.GeoDataFrame, eca_data_path: str) -> pd.DataFrame:
station_paths = [f"{eca_data_path}/TX_STAID{sid:06d}.txt" for sid in stations_in_region["STAID"]]
stations_tx_df = eca_read_all_stations_tx(station_paths)
# Merge observations with station metadata
stations_tx_df = stations_tx_df.merge(
stations_in_region[["STAID", "STANAME", "geometry"]],
on="STAID",
how="left",
)
return stations_tx_df
ndvi_period_to_season = {
r"(\d{4})-12-01_(\d{4})-03-01": "Winter",
r"(\d{4})-03-01_(\d{4})-06-01": "Spring",
r"(\d{4})-06-01_(\d{4})-09-01": "Summer",
r"(\d{4})-09-01_(\d{4})-12-01": "Autumn",
}
def ndvi_name_to_season(ndvi_name: str) -> str:
import re
for pattern, season_name in ndvi_period_to_season.items():
if re.match(pattern, ndvi_name):
return season_name
return "Unknown"
def _get_ndvi_periods(ndvi_files_pattern: str) -> list[dict]:
import glob
import re
ndvi_files = glob.glob(ndvi_files_pattern)
ndvi_periods = []
for ndvi_file in ndvi_files:
match = re.search(r"ndvi_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.tif", ndvi_file)
if not match:
continue
start_date = pd.to_datetime(match.group(1))
end_date = pd.to_datetime(match.group(2))
ndvi_periods.append(
{
"file": ndvi_file,
"start_date": start_date,
"end_date": end_date,
"period_str": f"{match.group(1)}_{match.group(2)}",
}
)
return ndvi_periods
def _calculate_station_ndvi(
station_gdf: gpd.GeoDataFrame,
ndvi_data: xr.DataArray,
buffer_meters: float,
) -> float:
"""
Calculate mean NDVI around a station within a buffer in meters.
Parameters:
- station_gdf: GeoDataFrame with a single station point geometry.
- ndvi_data: xarray DataArray with NDVI data.
- buffer_meters: Buffer radius in meters.
"""
# we need a projected CRS in meters
station_m = station_gdf.to_crs("EPSG:3857")
buffer_m = station_m.geometry.buffer(buffer_meters)
# reproject back to raster CRS
buffer_m = buffer_m.to_crs(ndvi_data.rio.crs)
ndvi_subset = ndvi_data.rio.clip([buffer_m.iloc[0]], drop=True)
mean_ndvi = float(ndvi_subset.mean().values)
return mean_ndvi
Now we can proceed with the TX-NDVI correlation analysis and visualizations.
To calculate the correlation between TX and NDVI for each station, we will first create a dataframe that contains for each NDVI period the mean TX and mean NDVI for each station.
def prepare_tx_ndvi_corr_df(
region_gdf: gpd.GeoDataFrame,
stations_gdf: gpd.GeoDataFrame,
ndvi_files_pattern: str = "../data/sentinel2_ndvi/ndvi_*.tif",
buffer_meters: float = 1000.0,
eca_data_path: str = "../processed/ECA_blend_tx",
) -> pd.DataFrame:
import warnings
stations_in_region = stations_gdf[stations_gdf.within(region_gdf.union_all(method="coverage"))]
if stations_in_region.empty:
print("Warning: No stations found in region")
return pd.DataFrame()
stations_tx_df = _load_station_tx_data(stations_in_region, eca_data_path)
ndvi_periods = _get_ndvi_periods(ndvi_files_pattern)
results = []
total_steps = len(ndvi_periods) * len(stations_in_region)
with tqdm(total=total_steps, desc="TX-NDVI") as pbar:
for period in ndvi_periods:
pbar.set_postfix_str(f"Period: {period['period_str']}")
period_str = period["period_str"]
ndvi_data = ndvi_read_xr_bbox_clip(period["file"], region_gdf)
ndvi_data = ndvi_data.compute() # eagerly load for faster access
stations_tx = stations_tx_df[
(stations_tx_df["DATE"] >= period["start_date"]) & (stations_tx_df["DATE"] <= period["end_date"])
]
if len(stations_tx) == 0:
warnings.warn(f"No TX data for period {period_str}")
continue
stations_avg_tx = stations_tx.groupby("STAID")["TX"].mean()
for _, station_row in stations_in_region.iterrows():
pbar.update(1)
staid = station_row["STAID"]
staname = station_row["STANAME"]
if staid not in stations_avg_tx.index:
# no TX data for this station in this period
continue
avg_tx = stations_avg_tx[staid]
station_gdf = gpd.GeoDataFrame([station_row], geometry="geometry", crs=stations_in_region.crs)
mean_ndvi = _calculate_station_ndvi(station_gdf, ndvi_data, buffer_meters)
if np.isnan(mean_ndvi):
warnings.warn(f"NaN NDVI for station {staid} ({staname}) in period {period_str}")
continue
result = {
"STAID": staid,
"STANAME": station_row["STANAME"],
"period": period["period_str"],
"start_date": period["start_date"],
"end_date": period["end_date"],
"avg_TX": avg_tx,
"mean_NDVI_2km": mean_ndvi,
"lon": station_row.geometry.x,
"lat": station_row.geometry.y,
}
results.append(result)
results_df = pd.DataFrame(results)
return results_df
tx_ndvi_df = prepare_tx_ndvi_corr_df(
region_gdf=bologna_region_union,
stations_gdf=bologna_stations_gdf,
buffer_meters=1000,
)
print(f"Total results: {len(tx_ndvi_df)} station-period combinations")
tx_ndvi_df.head()
Reading station TX data: 100%|███████████████████████████████████████| 60/60 [00:00<00:00, 83.21it/s] TX-NDVI: 100%|██████████████████████| 960/960 [01:40<00:00, 9.53it/s, Period: 2023-09-01_2023-12-01]
Total results: 864 station-period combinations
| STAID | STANAME | period | start_date | end_date | avg_TX | mean_NDVI_2km | lon | lat | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 861 | MONZUNO | 2020-03-01_2020-06-01 | 2020-03-01 | 2020-06-01 | 20.481250 | 0.576191 | 11.273333 | 44.276667 |
| 1 | 862 | PORRETTA TERME | 2020-03-01_2020-06-01 | 2020-03-01 | 2020-06-01 | 21.684848 | 0.494197 | 10.976944 | 44.154167 |
| 2 | 2791 | CASTEL DEL RIO | 2020-03-01_2020-06-01 | 2020-03-01 | 2020-06-01 | 23.690909 | 0.605687 | 11.505000 | 44.210833 |
| 3 | 2804 | COTTEDE | 2020-03-01_2020-06-01 | 2020-03-01 | 2020-06-01 | 18.918182 | 0.682569 | 11.169167 | 44.109444 |
| 4 | 2805 | DIGA DEL BRASIMONE | 2020-03-01_2020-06-01 | 2020-03-01 | 2020-06-01 | 17.184375 | 0.415361 | 11.117778 | 44.129167 |
We can now visualize the timeseries of TX for the stations in the region, colored by their NDVI values.
def display_station_tx_timeseries(
region_gdf: gpd.GeoDataFrame,
stations_gdf: gpd.GeoDataFrame,
tx_ndvi_df: pd.DataFrame,
eca_data_path: str = "../processed/ECA_blend_tx",
):
from matplotlib import cm
from matplotlib.colors import Normalize
stations_in_region = stations_gdf[stations_gdf.within(region_gdf.union_all(method="coverage"))]
if stations_in_region.empty:
warnings.warn("No stations found in region")
return
stations_tx_df = _load_station_tx_data(stations_in_region, eca_data_path)
# Get unique periods from tx_ndvi_df
periods = sorted(tx_ndvi_df["period"].unique())
num_plots = len(periods)
ncols = 2
nrows = max(math.ceil(num_plots / ncols), 1)
fig, axes = plt.subplots(
nrows,
ncols,
figsize=(10 * ncols, 4.0 * nrows),
squeeze=False,
)
axes_flat = axes.ravel()
# Colormap for NDVI values
cmap = plt.colormaps["RdYlGn"]
for i, period_str in tqdm(list(enumerate(periods)), desc="Plotting periods"):
ax = axes_flat[i]
# Detect season from period string
season = ndvi_name_to_season(period_str)
# Get NDVI data for this period from pre-calculated results
period_ndvi = tx_ndvi_df[tx_ndvi_df["period"] == period_str][["STAID", "mean_NDVI_2km"]].copy()
if period_ndvi.empty:
ax.text(
0.5,
0.5,
"No NDVI data",
ha="center",
va="center",
transform=ax.transAxes,
)
ax.set_title(f"{season} - {period_str}")
continue
start_date = tx_ndvi_df[tx_ndvi_df["period"] == period_str]["start_date"].iloc[0]
end_date = tx_ndvi_df[tx_ndvi_df["period"] == period_str]["end_date"].iloc[0]
# Filter TX data for this period
period_tx = stations_tx_df[(stations_tx_df["DATE"] >= start_date) & (stations_tx_df["DATE"] <= end_date)].copy()
if len(period_tx) == 0:
warnings.warn(f"No TX data for period {period_str}")
ax.text(
0.5,
0.5,
"No TX data",
ha="center",
va="center",
transform=ax.transAxes,
)
ax.set_title(f"Period: {period_str}")
continue
# Merge NDVI values directly into TX data using DataFrame operations
period_tx_with_ndvi = period_tx.merge(period_ndvi, on="STAID", how="inner")
if period_tx_with_ndvi.empty:
continue
# Normalize NDVI values for color mapping
ndvi_min = period_tx_with_ndvi["mean_NDVI_2km"].min()
ndvi_max = period_tx_with_ndvi["mean_NDVI_2km"].max()
norm = Normalize(vmin=ndvi_min, vmax=ndvi_max)
# Plot each station's time series with NDVI-based color using seaborn with weekly averaging
for staid, station_group in period_tx_with_ndvi.groupby("STAID"):
ndvi_val = station_group["mean_NDVI_2km"].iloc[0]
color = cmap(norm(ndvi_val))
sns.lineplot(
data=station_group,
x="DATE",
y="TX",
ax=ax,
color=color,
linewidth=1.5,
alpha=0.6,
marker="o",
markersize=4,
)
# Add colorbar
sm = cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
cbar = plt.colorbar(sm, ax=ax, orientation="vertical", pad=0.02, fraction=0.046)
cbar.set_label("Mean NDVI", fontsize=9)
ax.set_title(f"{season} - Station TX Time Series (Weekly Avg)\n{period_str}", fontsize=11)
ax.set_xlabel("Date")
ax.set_ylabel("Maximum Temperature (°C)")
ax.grid(alpha=0.3)
# Hide unused subplots
for j in range(i + 1, len(axes_flat)):
axes_flat[j].axis("off")
plt.tight_layout()
plt.show()
bologna_region_gdf = gadm_gdf[gadm_gdf["NAME_3"] == "Bologna"]
bologna_region_gdf = bologna_region_gdf.union_all("coverage")
# Find stations within the region
bologna_stations_gdf = stations_gdf[stations_gdf.within(bologna_region_union.union_all(method="coverage"))]
# Display station TX time series colored by NDVI for region (using pre-calculated NDVI)
display_station_tx_timeseries(
region_gdf=bologna_region_union,
stations_gdf=bologna_stations_gdf,
tx_ndvi_df=tx_ndvi_df,
)
Reading station TX data: 100%|███████████████████████████████████████| 60/60 [00:00<00:00, 89.62it/s] Plotting periods: 100%|██████████████████████████████████████████████| 16/16 [00:07<00:00, 2.23it/s]
Finally, we can visualize the TX-NDVI correlation with regression lines for each period.
def graph_tx_ndvi_correlation(er_results_df: pd.DataFrame):
"""Graph TX-NDVI correlation results with faceted scatter plots."""
required_columns = {"avg_TX", "mean_NDVI_2km", "period"}
if not required_columns.issubset(er_results_df.columns):
raise ValueError(f"er_results_df must contain columns: {required_columns}")
elif er_results_df.empty:
raise ValueError("er_results_df is empty.")
# one axis per NDVI period
periods = sorted(er_results_df["period"].unique())
n_periods = len(periods)
ncols = min(4, n_periods)
nrows = max(math.ceil(n_periods / ncols), 1)
fig, axes = plt.subplots(
nrows,
ncols,
figsize=(5.0 * ncols, 4.2 * nrows),
squeeze=False,
)
axes_flat = axes.ravel()
season_to_color = {
"Winter": "#1F77B4",
"Spring": "#2CA02C",
"Summer": "#D62728",
"Autumn": "#FF7F0E",
}
for i, period in enumerate(periods):
ax = axes_flat[i]
# Filter data for the current period
dfp = er_results_df[er_results_df["period"] == period]
x = dfp["mean_NDVI_2km"].values
y = dfp["avg_TX"].values
season = ndvi_name_to_season(period)
line_color = season_to_color.get(season, "red")
if len(dfp) >= 2:
sns.regplot(
x=x,
y=y,
ax=ax,
line_kws={"color": line_color, "linewidth": 2},
)
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
title = f"r={r_value:.3f}, R²={r_value**2:.3f}, p={p_value:.1e}"
else:
title = "insufficient data"
title = f"{season} ({period})\n{title}"
ax.set_title(title, fontsize=10)
# Labels on outer edges to reduce clutter
row_idx, col_idx = divmod(i, ncols)
if row_idx == nrows - 1:
ax.set_xlabel("Mean NDVI (1km buffer)")
if col_idx == 0:
ax.set_ylabel("Average Maximum Temperature (°C)")
# Hide unused axes (if any)
for j in range(i + 1, len(axes_flat)):
axes_flat[j].axis("off")
fig.suptitle("TX vs NDVI by Period", fontsize=14, y=0.98)
plt.tight_layout(rect=(0, 0, 1, 0.96))
plt.show()
graph_tx_ndvi_correlation(tx_ndvi_df)
For the selected region, we can see a trend in the TX-NDVI correlation across the same season in each year.
In winter, a lower NDVI (less vegetation) corresponds to lower TX values, while in summer, a higher NDVI (more vegetation) corresponds to lower TX values, indicating the cooling effect of vegetation during hotter months.
If we do the same for a different region, we might observe different patterns in the TX-NDVI correlation, reflecting the unique climatic and vegetative characteristics of that area. Let's try with the city of Modena:
TX-NDVI Correlation Analysis for other cities¶
modena_gdf = gadm_gdf[gadm_gdf["NAME_2"] == "Modena"].dissolve()
modena_stations_gdf = stations_gdf[stations_gdf.within(modena_gdf.union_all())]
tx_ndvi_df = prepare_tx_ndvi_corr_df(
region_gdf=modena_gdf,
stations_gdf=modena_stations_gdf,
buffer_meters=1000,
)
graph_tx_ndvi_correlation(tx_ndvi_df)
Reading station TX data: 100%|███████████████████████████████████████| 44/44 [00:00<00:00, 85.25it/s] TX-NDVI: 100%|██████████████████████| 704/704 [00:34<00:00, 20.58it/s, Period: 2023-09-01_2023-12-01]
The region of Modena shows a similar trend to Bologna, with a clear negative correlation between TX and NDVI during the summer months.
ravenna_gdf = gadm_gdf[gadm_gdf["NAME_2"] == "Ravenna"].dissolve()
ravenna_stations_gdf = stations_gdf[stations_gdf.within(ravenna_gdf.union_all())]
tx_ndvi_df = prepare_tx_ndvi_corr_df(
region_gdf=ravenna_gdf,
stations_gdf=ravenna_stations_gdf,
buffer_meters=1000,
)
graph_tx_ndvi_correlation(tx_ndvi_df)
Reading station TX data: 100%|███████████████████████████████████████| 30/30 [00:00<00:00, 88.56it/s] TX-NDVI: 100%|██████████████████████| 480/480 [00:27<00:00, 17.16it/s, Period: 2023-09-01_2023-12-01]
For the city of Ravenna, the TX-NDVI correlation analysis reveals a less pronounced relationship compared to Bologna and Modena.
This analysis is reasonable given Ravenna being a coastal city, where maritime influences can moderate temperature variations and potentially diminish the impact of vegetation on local temperatures.